{
  "metadata" : {
    "name" : "Spark 101",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "2015-01-10T00:02:12.659Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "46CEC7C56C6A46358F2BEB3E10B556D0"
    },
    "cell_type" : "markdown",
    "source" : "<style>\n  h1, h2, h3, h4, h5, p, ul, li {\n    color: #2C475C;\n  }\n  .output_html {\n    color: skyblue;\n  }\n  hr { height: 2px; color: lightblue; }\n</style>"
  }, {
    "metadata" : {
      "id" : "44C028B12E234BD88A96264AEEC2E801"
    },
    "cell_type" : "markdown",
    "source" : "# Spark 101"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "B9C94C7E25F044A58CDDFC7704C2BDF9"
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark._\nimport org.apache.spark.SparkContext._\nimport org.apache.spark.rdd._",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "3E7214F9051D4FE5B7D270D403724E6F"
    },
    "cell_type" : "markdown",
    "source" : "### First create a dataset using the local `syslog` file"
  }, {
    "metadata" : {
      "id" : "5092C5E55D3E4D1884C459C7217BD0DC"
    },
    "cell_type" : "markdown",
    "source" : "We will \n\n*  load the file\n*  convert each line keeping its size\n*  remove the duplicates"
  }, {
    "metadata" : {
      "id" : "3D29F319694B49668F0063E62F122F29"
    },
    "cell_type" : "markdown",
    "source" : "For that, we'll use the `sparkContext`, which\n\n* is the driver\n* can define job (read inputs, transform, group, etc)\n* constructs DAG\n* schedules tasks on the cluster"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "12D7D03CCF3A4CBE8F2EC92AB2E79505"
    },
    "cell_type" : "code",
    "source" : "val dta:RDD[Int] = sparkContext.textFile(\"/var/log/syslog\")\n                               .map(_.size)\n                               .distinct",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "AE43FAFD60784CBD98C40212EA016E68"
    },
    "cell_type" : "markdown",
    "source" : "**MappedRDD** is actually an instance of `RDD[Int]` that will contain the distinct sizes of the lines."
  }, {
    "metadata" : {
      "id" : "E4C91D776DC24CADB4F46D8CB09D8348"
    },
    "cell_type" : "markdown",
    "source" : "_Note_: there is NO computations happening! → [see UI](http://localhost:4040/stages/)"
  }, {
    "metadata" : {
      "id" : "19DB25C9982E4CA4859C44FF20B893B0"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "0D7C5C9A9731442FA3F74601876C1F92"
    },
    "cell_type" : "markdown",
    "source" : "### Now we can use the size for fancy operations like grouping per last digit"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "4F238AE4CC96481A8AE60F84C5D06137"
    },
    "cell_type" : "code",
    "source" : "val rdd1:RDD[(Int, Iterable[Int])] = dta.groupBy(_ % 10)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "960FCC7D876048AFA8CFEDE073622214"
    },
    "cell_type" : "markdown",
    "source" : "### But we can also get rid of even sizes (... non trivially...), then _tupling_ with some other computations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "828B2401BAB84FEA8E8258C49FF6FB68"
    },
    "cell_type" : "code",
    "source" : "val rdd2 = dta.map(_ + 1)\n              .filter(_ % 2 == 0)\n              .map(x => (x%10, x*x))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "B124A3E5C36B4AF495F4E17C1555602D"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "3C998EA9FBB34ADBB67A6E37AED29291"
    },
    "cell_type" : "markdown",
    "source" : "### We can combine distributed datasets into single ones, by _joining_ them for instance."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0ECFCF9DC9DE4893BE258C0E28126A7F"
    },
    "cell_type" : "code",
    "source" : "val joined = rdd1.join(rdd2)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "99D1D1AF025445B781FD195852AEA041"
    },
    "cell_type" : "markdown",
    "source" : "_Note (again)_: still nothing done on the cluster up to here → [see ui](http://localhost:4040/stages/)"
  }, {
    "metadata" : {
      "id" : "A0AD7A11A3724A3480C504A883437277"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "0B6EC3D5FD224D7980EB3CD1D7E2C09E"
    },
    "cell_type" : "markdown",
    "source" : "#### Now we ask the cluster to do the whole thing: Action"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "16463E75AFD04BE88E1C589B088341BA"
    },
    "cell_type" : "code",
    "source" : "joined.take(10).toList.mkString(\"\\n\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CBFB95C4F51442C886C8C81E2E3BE5B3"
    },
    "cell_type" : "markdown",
    "source" : "_Note (yeah)_: NOW there were some computations in the cluster → [see stages](http://localhost:4040/stages/) and [see tasks](http://localhost:4040/stages/stage/?id=3&attempt=0)"
  }, {
    "metadata" : {
      "id" : "F9856142C7D04653BB80CB33944AD7E7"
    },
    "cell_type" : "markdown",
    "source" : "-----"
  }, {
    "metadata" : {
      "id" : "220F90FFC9464168A97C2A8D6F5BA3F7"
    },
    "cell_type" : "markdown",
    "source" : "## But what just happened?"
  }, {
    "metadata" : {
      "id" : "5E9CEC67863944C19882273526A0DEBD"
    },
    "cell_type" : "markdown",
    "source" : "### First Spark created a DAG based on the job definition"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A91299C34A6A4B758B46D8280DA27CB7"
    },
    "cell_type" : "code",
    "source" : "joined.toDebugString",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0248A08A23894C3A8FB617ADC1B4CC12"
    },
    "cell_type" : "markdown",
    "source" : "### Then it scheduled it to the executors in the cluster <small>only one when running in local mode<small>"
  }, {
    "metadata" : {
      "id" : "D6C71E9A63894B4E8B20A2662B5361DF"
    },
    "cell_type" : "markdown",
    "source" : "We can check the <strong>Total tasks</strong> activity in the [UI](http://localhost:4040/executors/)"
  }, {
    "metadata" : {
      "id" : "DAB44BF35B4141EF8C61E10E322E2199"
    },
    "cell_type" : "markdown",
    "source" : "-------"
  }, {
    "metadata" : {
      "id" : "DE361E8132064C35B4C4EFFCF8B2A40E"
    },
    "cell_type" : "markdown",
    "source" : "## Now we will prepare the dataset and then using it several times"
  }, {
    "metadata" : {
      "id" : "93108F8366B54B04B4989B6FC56A5DF1"
    },
    "cell_type" : "markdown",
    "source" : "So we'll read a file about stock price per day, so let's create a type holding relevant data."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A94CC65A4C6742F38A7DE218C3E9429D"
    },
    "cell_type" : "code",
    "source" : "case class Quote(stock:String, date:String, price:Double) extends java.io.Serializable",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "E55DDA65D10C4F128F2B4B677B4E98DB"
    },
    "cell_type" : "markdown",
    "source" : "The file will contain lines like:\n``` \nASTE,2011-12-06,33.93\nASTE,2012-03-14,36.84\n```"
  }, {
    "metadata" : {
      "id" : "8D7E41D23D544E36A1B986DB370AAB0A"
    },
    "cell_type" : "markdown",
    "source" : "Let's download the data first"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F3168A0EFCEA4813A8F5F44D57423710"
    },
    "cell_type" : "code",
    "source" : "import sys.process._\n\"mkdir -p /tmp/data\"!!\n\nif (!new java.io.File(\"/tmp/data/closes.csv\").exists)\n  \"wget https://s3-eu-west-1.amazonaws.com/spark-notebook-data/closes.csv -O /tmp/data/closes.csv\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F41892F4F09947488E5688C59A140C2C"
    },
    "cell_type" : "code",
    "source" : ":sh du -h /tmp/data/closes.csv",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "64A4F31B05B34470826FEDD14D3220C6"
    },
    "cell_type" : "code",
    "source" : "val closes:RDD[Quote] = sparkContext.textFile(\"/tmp/data/closes.csv\")\n                                   .map(_.split(\",\").toList)\n                                   .map{ case s::d::p::Nil => Quote(s, d, p.toDouble)}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "B42BE2395000448788544554D90EF13F"
    },
    "cell_type" : "markdown",
    "source" : "We have date, so we can group stock prices per day"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7E461A6F79064E82BEF457C44A6FD290"
    },
    "cell_type" : "code",
    "source" : "val byDate = closes.keyBy(_.date)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "5B898CB11B214AFB8A8300A1E67CC853"
    },
    "cell_type" : "markdown",
    "source" : "Now we can compute the minimum stocks per date"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F8E9582D50F4449E874EAD7BFD09BED2"
    },
    "cell_type" : "code",
    "source" : "def minByDate = byDate.combineByKey[(String, Double)](                                                                                           // `def` to force spark recomputing... otherwise it's smart enough to reuse previous RDDs...\n  (x:Quote) => (x.stock, x.price), \n  (d:(String, Double), l:Quote) => if (d._2 < l.price) d else (l.stock, l.price),\n  (d1:(String, Double), d2:(String, Double)) => if (d1._2 < d2._2) d1 else d2\n)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "35B0B9FEC268438196978CB4856AA4D6"
    },
    "cell_type" : "code",
    "source" : "<pre>{minByDate.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D6A99D77B1C1404E821A8A68011A4733"
    },
    "cell_type" : "markdown",
    "source" : "It took ~2 seconds (in local[8] and 24G of RAM)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "9C8B7A146E404C5087F4734C48C5E251"
    },
    "cell_type" : "code",
    "source" : "<pre>{minByDate.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "994354908DA34A638C2C99BFA5573B2A"
    },
    "cell_type" : "markdown",
    "source" : "Once again.... 2 seconds!!!"
  }, {
    "metadata" : {
      "id" : "CCAE12ADF4BC4195B4D5C2428CEFA221"
    },
    "cell_type" : "markdown",
    "source" : "#### Solution: caching!"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0E3E7BA6B0654C1082700F1CB46545E5"
    },
    "cell_type" : "code",
    "source" : "val maxByDate2 = byDate.combineByKey[(String, Double)](\n  (x:Quote) => (x.stock, x.price), \n  (d:(String, Double), l:Quote) => if (d._2 > l.price) d else (l.stock, l.price),\n  (d1:(String, Double), d2:(String, Double)) => if (d1._2 > d2._2) d1 else d2\n)\n\nmaxByDate2.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "A87AE9D86357497F8F56B65231041D12"
    },
    "cell_type" : "markdown",
    "source" : "Ask some data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E6A6AF0C873B41A4834CCE29B222BB5C"
    },
    "cell_type" : "code",
    "source" : "<pre>{maxByDate2.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "72317C1AFA09488E9B5E61BC76FC4C38"
    },
    "cell_type" : "markdown",
    "source" : "**Go to [UI](http://localhost:4040/storage/)**"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "051E00657F2041DD8A874D89566D9D2C"
    },
    "cell_type" : "code",
    "source" : "<pre>{maxByDate2.take(2).toList.mkString(\"\\n\")}</pre>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "6C954E9F8CD24401878288E3B86FD287"
    },
    "cell_type" : "markdown",
    "source" : "**BLAZING FAST** => Reuses the cache!"
  } ],
  "nbformat" : 4
}